Bundle org.nuxeo.ecm.core.bulk
Documentation
-
README.md
nuxeo-core-bulk
About
This module provides the ability to execute actions asynchronously on a -possibly large- set of documents. This is done by leveraging Nuxeo Streams that bring scalability and fault tolerance.
Definitions
-
document set: a list of documents from a repository represented as a list of document identifiers.
-
action: an operation that can be applied to a document set.
-
command: a set of parameters building a request to apply an action on a document set.
-
bucket: a portion of a document set that fits into a stream record.
-
batch: a smaller (or equals) portion of a bucket where the action is applied within a transaction.
Requirements
To work properly the Bulk Service need a true KeyValue storage to store the command its status, there are 2 possibles choices:
- Use
RedisKeyValueStore
this is the case if you havenuxeo.redis.enabled=true
in your nuxeo.conf. - Use
MongoDBKeyValueStore
this is the case if you are using themongodb
template.
You should not rely on the default
MemKeyValueStore
implementation that flushes the data on restart.Bulk Command
The bulk command is the input of the framework. It is composed by the unique name of the action to execute, the NXQL query that materializes the document set, the user submitting the command, the repository and some optional parameters that could be needed by the action:
BulkCommand command = new BulkCommand.Builder("myAction", "SELECT * from Document") .repository("myRepository") .user("myUser") .param("param1", "myParam1") .param("param2", "myParam2") .build(); String commandId = Framework.getService(BulkService.class).submit(command);
Execution flow
The BulkService
The entry point is the
BulkService
that takes a bulk command as an input. The service submits this command, meaning it appends theBulkCommand
to thecommand
stream.The BulkService can also returns the status of a command which is internally stored into a KeyValueStore.
The Scroller computation
The
command
stream is the input of theScroller
computation.This computation scrolls the database using a cursor to retrieve the document ids matching the NXQL query. The ids are grouped into a bucket that fit into a record.
The
BulkBucket
record is appended to the action's stream.The scroller send command status update to inform that the scroll is in progress or terminated and to set the total number of document in the materialized document set.
Actions processors
Each action runs its own stream processor (a topology of computations).
The action processor must respect the following rules:
-
action must send a status update containing the number of processed documents since the last update.
-
action must handle possible error, for instance the user that send the command might not have write permission on all documents
-
the total number of processed documents reported must match at some point the number of documents in the document set.
-
action that aggregates bucket records per command must handle interleaved commands. This can be done by maintaining a local state for each command.
-
action that aggregates bucket records per command should checkpoint only when there no other interleaved command in progress. This is to prevent checkpoint while some records are not yet processed resulting in possible loss of record.
An
AbstractBulkComputation
is provided so an action can be implemented easily with a single computation SeeSetPropertiesAction
for a simple example.See The
CSVExportAction
and particularly theMakeBlob
computation for an advanced example.The Status computation
This computation reads from the
status
stream and aggregate status update to build the current status of command. The status is saved into a KeyValueStore. When the number of processed document is equals to the number of document in the set, the state is changed to completed. And the computation appends the final status to thedone
stream.This
done
stream can be used as an input by custom computation to execute other actions once a command is completed.How to contribute an action
You need to register a couple action / stream processor :
<extension target="org.nuxeo.ecm.core.bulk" point="actions"> <action name="myAction" bucketSize="100" batchSize="20"/> </extension>
<extension target="org.nuxeo.runtime.stream.service" point="streamProcessor"> <streamProcessor name="myAction" class="org.nuxeo.ecm.core.bulk.action.MyActionProcessor" defaultConcurrency="2" defaultPartitions="4" /> </extension>
See
SetPropertiesAction
for a very basic action implementation.You can find more info on how to configure a stream processor in the following link: https://github.com/nuxeo/nuxeo/tree/master/modules/runtime/nuxeo-runtime-stream#stream-processing
Testing a bulk action with REST API
Here is an example on how to launch a bulk command and get status:
## Run a bulk action curl -s -X POST 'http://localhost:8080/nuxeo/site/automation/Bulk.RunAction' -u Administrator:Administrator -H 'content-type: application/json+nxrequest' -d '{ "context": {}, "params": { "action": "csvExport", "query": "SELECT * FROM File WHERE ecm:isVersion = 0 AND ecm:isTrashed = 0", "parameters": {} } }' | tee /tmp/bulk-command.txt # {"commandId":"e8cc059d-6b9d-480b-a6e1-b0edace6d982"} ## Extract the command id from the output commandId=$(cat /tmp/bulk-command.txt | jq .[] | tr -d '"') ## Ask for the command status curl -s -X GET "http://localhost:8080/nuxeo/api/v1/bulk/$commandId" -u Administrator:Administrator -H 'content-type: application/json' | jq . # { # "entity-type": "bulkStatus", # "commandId": "e8cc059d-6b9d-480b-a6e1-b0edace6d982", # "state": "RUNNING", # "processed": 0, # "total": 1844, # "submitted": "2018-10-11T13:10:26.825Z", # "scrollStart": "2018-10-11T13:10:26.827Z", # "scrollEnd": "2018-10-11T13:10:26.846Z", # "completed": null #} ## Wait for the completion of the command, this is only for testing purpose ## a normal client should poll the status regularly instead of using this call: curl -X POST 'http://localhost:8080/nuxeo/site/automation/Bulk.WaitForAction' -u Administrator:Administrator -H 'content-type: application/json+nxrequest' -d $'{ "context": {}, "params": { "commandId": "'"$commandId"'", "timeoutSecond": "3600" } }' # {"entity-type":"boolean","value":true} ## Get the status again: curl -s -X GET "http://localhost:8080/nuxeo/api/v1/bulk/$commandId" -u Administrator:Administrator -H 'content-type: application/json' | jq . #{ # "entity-type": "bulkStatus", # "commandId": "e8cc059d-6b9d-480b-a6e1-b0edace6d982", # "state": "COMPLETED", # "processed": 1844, # "total": 1844, # "submitted": "2018-10-11T13:10:26.825Z", # "scrollStart": "2018-10-11T13:10:26.827Z", # "scrollEnd": "2018-10-11T13:10:26.846Z", # "completed": "2018-10-11T13:10:28.243Z" #}
Also a command can be aborted, this is useful for long running command launched by error, or to by pass a command that fails systematically which blocks the entire action processor:
## Abort a command curl -s -X PUT "http://localhost:8080/nuxeo/api/v1/bulk/$commandId/abort" -u Administrator:Administrator -H 'content-type: application/json' | jq .
Debugging
All streams used by the bulk service and action can be introspected using the Nuxeo
bin/stream.sh
script.For instance to see the latest commands submitted:
## When using Kafka ./bin/stream.sh tail -k -l bulk-command --codec avro ## When using Chronicle Queue # ./bin/stream.sh tail --chronicle ./nxserver/data/stream/bulk -l command --codec avro
offset watermark flag key length data bulk-command-01:+2 2018-10-11 11:18:34.955:0 [DEFAULT] setProperties 164 {"id": "b667b677-d40e-471a-8377-eb16dd301b42", "action": "setProperties", "query": "Select * from Document", "username": "Administrator", "repository": "default", "bucketSize": 100, "batchSize": 25, "params": "{"dc:description":"a new new testbulk description"}"} bulk-command-00:+2 2018-10-11 15:10:26.826:0 [DEFAULT] csvExport 151 {"id": "e8cc059d-6b9d-480b-a6e1-b0edace6d982", "action": "csvExport", "query": "SELECT * FROM File WHERE ecm:isVersion = 0 AND ecm:isTrashed = 0", "username": "Administrator", "repository": "default", "bucketSize": 100, "batchSize": 50, "params": null} To get the latest commands completed:
./bin/stream.sh tail -k -l bulk-done --codec avro
offset watermark flag key length data bulk-done-00:+4 2018-10-11 14:23:29.219:0 [DEFAULT] 580df47d-dd90-4d16-b23c-0e39ae363e06 96 {"commandId": "580df47d-dd90-4d16-b23c-0e39ae363e06", "action": "csvExport", "delta": false, "processed": 3873, "state": "COMPLETED", "submitTime": 1539260607207, "scrollStartTime": 1539260607275, "scrollEndTime": 1539260607326, "completedTime": 1539260609218, "total": 3873, "result": null} bulk-done-00:+5 2018-10-11 15:10:28.244:0 [DEFAULT] e8cc059d-6b9d-480b-a6e1-b0edace6d982 96 {"commandId": "e8cc059d-6b9d-480b-a6e1-b0edace6d982", "action": "csvExport", "delta": false, "processed": 1844, "state": "COMPLETED", "submitTime": 1539263426825, "scrollStartTime": 1539263426827, "scrollEndTime": 1539263426846, "completedTime": 1539263428243, "total": 1844, "result": null} Of course you can view the BulkBucket message
./bin/stream.sh tail -k -l bulk-csvExport --codec avro
offset watermark flag key length data bulk-csvExport-01:+48 2018-10-11 15:10:26.842:0 [DEFAULT] e8cc059d-6b9d-480b-a6e1-b0edace6d982:18 3750 {"commandId": "e8cc059d-6b9d-480b-a6e1-b0edace6d982", "ids": ["763135b8-ca49-4eea-9a52-1ceaa227e60a", ...]} And check for any lag on any computation, for more information on
stream.sh
:./bin/stream.sh help
About Nuxeo
Nuxeo dramatically improves how content-based applications are built, managed and deployed, making customers more agile, innovative and successful. Nuxeo provides a next generation, enterprise ready platform for building traditional and cutting-edge content oriented applications. Combining a powerful application development environment with SaaS-based tools and a modular architecture, the Nuxeo Platform and Products provide clear business value to some of the most recognizable brands including Verizon, Electronic Arts, Sharp, FICO, the U.S. Navy, and Boeing. Nuxeo is headquartered in New York and Paris. More information is available at www.nuxeo.com.
-
Resolution Order
You can influence this order by adding "require" tags in the component declaration, to make sure it is resolved after another component. It will also impact the order in which contributions are registered on their target extension point (see "Registration Order" on contributions).
Components
- org.nuxeo.ecm.core.scroll.service
- org.nuxeo.ecm.core.scroll.contrib.default
- org.nuxeo.ecm.core.bulk
- org.nuxeo.ecm.core.bulk.io
- org.nuxeo.ecm.core.bulk.config
Maven Artifact
File | nuxeo-core-bulk-2021.25.8.jar |
---|---|
Group Id | org.nuxeo.ecm.core |
Artifact Id | nuxeo-core-bulk |
Version | 2021.25.8 |
Manifest
Manifest-Version: 1.0
Archiver-Version: Plexus Archiver
Created-By: Apache Maven
Built-By: root
Build-Jdk: 11.0.16
Bundle-ManifestVersion: 1
Bundle-Version: 1.0.0
Bundle-Name: Nuxeo Core Bulk
Bundle-SymbolicName: org.nuxeo.ecm.core.bulk;singleton:=true
Nuxeo-Component: OSGI-INF/scroll-service.xml,OSGI-INF/scroll-contrib.x
ml,OSGI-INF/bulk-component.xml,OSGI-INF/bulk-config.xml,OSGI-INF/bulk
-io-contrib.xml
Exports
- Json Export Default Json serialization
- Json Graph Json dependency graph
- Json Contribution Stats Json statistics for contributions
- CSV Contribution Stats CSV statistics for contributions
Charts
-
Raw Data: Json Contribution Stats